package com.watertek.bdcenter.bdcode.dao.impl;

import com.alibaba.druid.pool.DruidPooledConnection;
import com.watertek.bdcenter.bdcode.dao.HiveDao;
import com.watertek.bdcenter.bdcode.util.DBConnectionPool;
import com.watertek.bdcenter.bdcode.util.HiveUtil;
import com.watertek.bdcenter.bdcode.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;

import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

@Repository
public class HiveDaoImpl extends AbstractDao implements HiveDao {

    private static final Logger logger = LoggerFactory.getLogger(HiveDaoImpl.class);

    /**
     * 对指定的hive表进行手工编码
     * @param databaseName
     * @param tableName
     * @param id
     * @param bdCode
     */
    @Override
    public void manualCode(String databaseName, String tableName, String id, String bdCode) {
        DBConnectionPool dbp = DBConnectionPool.getInstance();
        DruidPooledConnection con = dbp.getConnection();
        PreparedStatement preparedStatement = null;

        String sql=new String("update "+tableName+" set bd_code=?"+" where id=?");
        try {
            preparedStatement=con.prepareStatement(sql);
            preparedStatement.setString(1, bdCode);
            preparedStatement.setString(2, id);
            preparedStatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }

    }

    /**
     * 根据表名获取元数据：列名和列的类型
     * @param tableName
     * @return
     */
    public Map<String, String> getMetadataMap(String tableName){
        logger.info("getMetadataMap method start");

        DatabaseMetaData metaData;
        ResultSet rs;
        Map<String, String> map=new HashMap<>();
        try {
            metaData = DBConnectionPool.getInstance().getConnection().getMetaData();
            rs = metaData.getColumns(null, "%", tableName, null);
            while (rs.next()) {
//                logger.info("COLUMN_NAME:   "+rs.getString("COLUMN_NAME"));
//                logger.info("TYPE_NAME:   "+rs.getString("TYPE_NAME"));
                map.put(rs.getString("COLUMN_NAME"), rs.getString("TYPE_NAME"));
            }
            return map;
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 根据元数据参数map，创建表
     * @param tableName
     * @param map
     */
    public void createOrcTable(String tableName, Map<String, String> map){
        logger.info("createOrcTable method start");

        StringBuffer sql=new StringBuffer("CREATE TABLE "+tableName+"(");
        if(null!=map && map.size()>0){
            Set set=map.keySet();
            Iterator i=set.iterator();
            String colName= (String) i.next();
            String colType=map.get(colName);
            sql.append(colName+" "+colType);
            while (i.hasNext()){
                String columnName= (String) i.next();
                String columnType=map.get(columnName);
                sql.append(","+columnName+" "+columnType);
            }
            sql.append(") clustered by (id) into 2 buckets stored as orc TBLPROPERTIES('transactional'='true')");
            try {
                DBConnectionPool.getInstance().getConnection().prepareStatement(sql.toString()).execute();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 存储元数据
     * @param tableName
     * @param map
     */
    @Override
    public void storeMetadata(String tableName, Map<String, String> map){
        String tableUuid=StringUtil.uuid();

        // 向metadata_table表中插入数据
        new Thread() {
            @Override
            public void run() {
                DBConnectionPool dbp = DBConnectionPool.getInstance();
                DruidPooledConnection con = dbp.getConnection();
                PreparedStatement preparedStatement = null;
                StringBuffer sql=new StringBuffer("insert into metadata_table(id, name) values(?, ?)");
                try {
                    preparedStatement=con.prepareStatement(sql.toString());
                    preparedStatement.setString(1, tableUuid);
                    preparedStatement.setString(2, tableName);
                    preparedStatement.executeUpdate();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        // 向metadata_column表中插入数据
        new Thread() {
            @Override
            public void run() {
                DBConnectionPool dbp = DBConnectionPool.getInstance();
                DruidPooledConnection con = dbp.getConnection();
                PreparedStatement preparedStatement = null;
                Set key=map.keySet();
                Iterator iterator=key.iterator();
                while (iterator.hasNext()){
                    String columnName=iterator.next().toString();
                    String columnType=map.get(columnName);
                    StringBuffer sql=new StringBuffer("insert into metadata_column(id, name, type, table_id) values(?, ?, ?, ?)");
                    try {
                        preparedStatement=con.prepareStatement(sql.toString());
                        preparedStatement.setString(1, StringUtil.uuid());
                        preparedStatement.setString(2, columnName);
                        preparedStatement.setString(3, columnType);
                        preparedStatement.setString(4, tableUuid);
                        preparedStatement.executeUpdate();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }

    /**
     * 将originTable表中的数据插入到destTable表中（批量插入）
     * @param originTable
     * @param destTable
     * @param layer
     */
    @Override
    public void insertOrcTable(String originTable, String destTable, String layer) {
        logger.info("insertOrcTable method start");

        DBConnectionPool dbp = DBConnectionPool.getInstance();
        DruidPooledConnection con = dbp.getConnection();
        PreparedStatement preparedStatement = null;

        try {
            // 从originTable中获取元数据
            Map<String, String> MetadataMap=getMetadataMap(originTable);
            // 根据元数据拼接列名字符串
            StringBuffer columnStringBuffer = new StringBuffer();
            if(null!=MetadataMap && MetadataMap.size()>0) {
                Set set = MetadataMap.keySet();
                Iterator i = set.iterator();
                String colName = (String) i.next();
                columnStringBuffer.append(colName);
                while (i.hasNext()) {
                    String columnName = (String) i.next();
                    columnStringBuffer.append("," + columnName);
                }
            }
            // 执行批量插入
            String sql=new String("insert into "+destTable+"("+columnStringBuffer.toString()+", bd_code" +
                    ") select "+columnStringBuffer.toString()+", getgcode1dofpoint(lon, lat, "+layer+") from "+originTable);
            preparedStatement = con.prepareStatement(sql);
            preparedStatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            if(null!=preparedStatement){
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(null!=con){
                try {
                    con.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 删除表格
     * @param tableName
     */
    @Override
    public void dropTable(String tableName){
        logger.info("dropTable method start");

        DBConnectionPool dbp = DBConnectionPool.getInstance();
        DruidPooledConnection con = dbp.getConnection();
        PreparedStatement preparedStatement = null;

        String sql=new String("drop table "+tableName);
        try {
            preparedStatement=con.prepareStatement(sql);
            preparedStatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            if(null!=preparedStatement){
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(null!=con){
                try {
                    con.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 查询表格中的记录
     * @param tableName
     * @return
     */
    public List findAll(String tableName){
        logger.info("findAll method start");

        DBConnectionPool dbp = DBConnectionPool.getInstance();
        DruidPooledConnection con = dbp.getConnection();
        PreparedStatement preparedStatement = null;

        try {
            preparedStatement=con.prepareStatement("select * from "+tableName);
            preparedStatement.executeQuery();
            ResultSet rs=preparedStatement.getResultSet();
            return HiveUtil.convertList(rs);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 根据id,查询表格中的某一条记录
     * @param tableName
     * @param id
     * @return
     */
    public List findById(String tableName, String id){
        logger.info("findById method start");

        DBConnectionPool dbp = DBConnectionPool.getInstance();
        DruidPooledConnection con = dbp.getConnection();
        PreparedStatement preparedStatement = null;

        try {
            preparedStatement=con.prepareStatement("select * from "+tableName+" where id="+id);
            preparedStatement.executeQuery();
            ResultSet rs=preparedStatement.getResultSet();
            return HiveUtil.convertList(rs);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 根据北斗网格编码bdCode，查询表格中的某一条记录
     * @param tableName
     * @param bdCode
     * @return
     */
    public List findByBdCode(String tableName, String bdCode){
        logger.info("findByBdCode method start");

        DBConnectionPool dbp = DBConnectionPool.getInstance();
        DruidPooledConnection con = dbp.getConnection();
        PreparedStatement preparedStatement = null;

        try {
            preparedStatement=con.prepareStatement("select * from "+tableName+" where bd_code="+bdCode);
            preparedStatement.executeQuery();
            ResultSet rs=preparedStatement.getResultSet();
            return HiveUtil.convertList(rs);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 对某个表进行分页查询
     * @param tableName
     * @param startNo
     * @param maxCount
     */
    public List findByPage(String tableName, int startNo, int maxCount){
        logger.info("getData method start");

        DBConnectionPool dbp = DBConnectionPool.getInstance();
        DruidPooledConnection con = dbp.getConnection();
        PreparedStatement preparedStatement = null;

        try {
            String sql="select * from (select row_number() over (order by id) as rnum, "+tableName+".* from "+tableName+")t where rnum between "+startNo+" and "+maxCount;
            preparedStatement = con.prepareStatement(sql,ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
            // 最大查询到第几条记录
            preparedStatement.setMaxRows(startNo+maxCount-1);
            ResultSet rs = preparedStatement.executeQuery();
            return HiveUtil.convertList(rs);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

}
